Streaming LLM Responses
Reading time: ~40 minutes | Level: Advanced
The Puzzle
Before reading further, predict what this code will do when connected to a real LLM API:
import anthropic
client = anthropic.Anthropic()
with client.messages.stream(
model="claude-opus-4-5",
max_tokens=200,
messages=[{"role": "user", "content": "Count from 1 to 10."}],
) as stream:
full_text = ""
for chunk in stream:
# What is the type of chunk?
# Does every chunk contain text?
# How do you accumulate the full response?
pass
# After the loop, what does stream.get_final_message() return?
# How many API calls were made?
The surprising answers: chunks have multiple types (message_start, content_block_start, content_block_delta, content_block_stop, message_delta, message_stop). Most chunks contain no text at all -- they carry metadata, usage information, or bookkeeping events. A single stream for a short response can produce 15-25 separate chunk objects. And stream.get_final_message() gives you a complete assembled message with usage statistics, re-using the single API call you already made.
Understanding the streaming protocol at this level is what separates engineers who build robust streaming pipelines from those who hit mysterious bugs in production.
What You Will Learn
- Why streaming matters: UX, time-to-first-token, long outputs
- How streaming works under the hood: SSE, chunked HTTP transfer
- The Anthropic and OpenAI streaming APIs in Python
- Async generators for streaming:
yieldinsideasync def - Building a FastAPI streaming endpoint with
StreamingResponse - Accumulating streamed output: partial JSON, detecting tool call chunks mid-stream
- Streaming with tool use: detecting and assembling tool call arguments
- Frontend patterns:
EventSourcein JavaScript, React hooks - Error handling in streams: connection drops and mid-stream errors
- Testing streaming endpoints
Part 1 -- Why Streaming Matters
Consider a user asking your LLM assistant to write a technical blog post. Without streaming:
- User submits request
- User stares at a spinner for 25 seconds
- Full 2,000-word post appears all at once
With streaming:
- User submits request
- First words appear within 300ms
- Content flows word by word, exactly like watching someone type
The UX difference is enormous. Studies on LLM chat applications consistently show that users tolerate much longer total generation times when output is streamed versus delivered in bulk. Time-to-first-token (TTFT) is the most important latency metric for LLM chat interfaces.
But streaming matters beyond UX:
Long output handling. Without streaming, your HTTP connection must stay open for the entire generation time. A 60-second HTTP response with no data is indistinguishable from a hung connection to most load balancers, which will terminate it. Streaming sends data continuously, keeping the connection alive.
Memory efficiency. Non-streaming collects the full response in memory on the server before sending anything. With streaming, you forward chunks immediately and never buffer the complete response.
Early termination. With streaming, users can stop generation mid-way through if they have seen enough. Without streaming, you pay for the full generation whether or not it was useful.
Part 2 -- How Streaming Works Under the Hood
LLM APIs stream using Server-Sent Events (SSE), a standard HTTP mechanism for unidirectional server-to-client push over a persistent connection.
Each SSE event looks like this on the wire:
data: {"type": "content_block_delta", "index": 0, "delta": {"type": "text_delta", "text": " answer"}}
data: {"type": "content_block_delta", "index": 0, "delta": {"type": "text_delta", "text": " is"}}
data: [DONE]
Note the blank line between events -- that is the SSE delimiter. Python SDKs parse this for you. Understanding the wire format helps when debugging network issues or building a custom SSE client.
Chunked Transfer Encoding
HTTP/1.1 also supports streaming via chunked transfer encoding, which the API may use as an alternative transport. Instead of setting Content-Length upfront (which would require buffering the full response), the server sends the body in chunks with a length prefix for each chunk:
HTTP/1.1 200 OK
Transfer-Encoding: chunked
Content-Type: text/event-stream
1a
data: {"type": "message_start"}
2b
data: {"type": "content_block_delta", ...}
0
The 0 at the end signals the final chunk. Again, the SDK handles this transparently.
Part 3 -- Python SDK Streaming
Anthropic Streaming
Anthropic's Python SDK provides a context manager-based streaming API that handles event parsing and assembly:
import anthropic
client = anthropic.Anthropic()
# The stream context manager handles connection lifecycle
with client.messages.stream(
model="claude-opus-4-5",
max_tokens=1024,
messages=[{"role": "user", "content": "Explain transformer attention."}],
) as stream:
# text_stream filters the raw events and yields only text deltas
# This is the simplest way to get streaming text
for text in stream.text_stream:
print(text, end="", flush=True)
# Each `text` is a small string fragment, often 1-4 words
# After the context manager exits, get the complete assembled message
# This does NOT make another API call -- it returns what was already received
final_message = stream.get_final_message()
print(f"\nTotal tokens: {final_message.usage.input_tokens} in, "
f"{final_message.usage.output_tokens} out")
For lower-level control over every event type:
with client.messages.stream(
model="claude-opus-4-5",
max_tokens=1024,
messages=[{"role": "user", "content": "..."}],
) as stream:
for event in stream:
# event.type is one of:
# "message_start" -- contains model, id, usage.input_tokens
# "content_block_start" -- start of a content block (text or tool_use)
# "content_block_delta" -- a text fragment or tool_input fragment
# "content_block_stop" -- end of a content block
# "message_delta" -- final usage.output_tokens
# "message_stop" -- stream is done
if event.type == "content_block_delta":
if event.delta.type == "text_delta":
print(event.delta.text, end="", flush=True)
elif event.delta.type == "input_json_delta":
# Tool call argument fragment -- discussed in Part 6
print(f"[tool arg fragment: {event.delta.partial_json!r}]")
elif event.type == "message_delta":
# This event carries final token counts
print(f"\nOutput tokens: {event.usage.output_tokens}")
OpenAI Streaming
import openai
client = openai.OpenAI()
# stream=True enables streaming mode
stream = client.chat.completions.create(
model="gpt-4o",
max_tokens=1024,
messages=[{"role": "user", "content": "Explain transformer attention."}],
stream=True,
)
# Iterate over the stream -- each chunk is a ChatCompletionChunk
accumulated = ""
for chunk in stream:
# delta.content is the text fragment, or None for non-text chunks
delta = chunk.choices[0].delta.content
if delta is not None:
print(delta, end="", flush=True)
accumulated += delta
# finish_reason is set on the final chunk
if chunk.choices[0].finish_reason is not None:
print(f"\nFinished: {chunk.choices[0].finish_reason}")
Async Streaming
For use in FastAPI or other async contexts, both SDKs provide async streaming:
import anthropic
import asyncio
async def stream_response(prompt: str) -> str:
"""
Async streaming with Anthropic. Returns the full response text
after printing each chunk as it arrives.
"""
client = anthropic.AsyncAnthropic()
accumulated = ""
async with client.messages.stream(
model="claude-opus-4-5",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}],
) as stream:
async for text in stream.text_stream:
print(text, end="", flush=True)
accumulated += text
print() # newline after streaming
return accumulated
asyncio.run(stream_response("What is the capital of France?"))
Part 4 -- Async Generators for Streaming
The real power of Python's async streaming integration comes from async generators. You can create a pipeline where each stage transforms the stream:
from typing import AsyncIterator
import anthropic
async def raw_text_stream(
prompt: str,
model: str = "claude-opus-4-5",
max_tokens: int = 1024,
) -> AsyncIterator[str]:
"""
Async generator that yields text fragments from an LLM.
The caller drives the generation pace -- true backpressure.
"""
client = anthropic.AsyncAnthropic()
async with client.messages.stream(
model=model,
max_tokens=max_tokens,
messages=[{"role": "user", "content": prompt}],
) as stream:
async for text in stream.text_stream:
yield text
# yield suspends here -- the next chunk is not fetched
# until the caller calls __anext__() again
async def word_stream(prompt: str) -> AsyncIterator[str]:
"""
Transform a character/token stream into a word stream.
Buffers partial words until a space or punctuation appears.
"""
buffer = ""
async for chunk in raw_text_stream(prompt):
buffer += chunk
# Emit complete words (split on whitespace)
while " " in buffer or "\n" in buffer:
# Find the first word boundary
for i, char in enumerate(buffer):
if char in " \n":
word = buffer[:i+1]
buffer = buffer[i+1:]
yield word
break
# Emit any remaining text (last word may not have trailing space)
if buffer:
yield buffer
async def sentence_stream(prompt: str) -> AsyncIterator[str]:
"""
Transform a token stream into complete sentences.
Useful for TTS (text-to-speech) pipelines where you need
complete sentences before synthesizing audio.
"""
buffer = ""
sentence_endings = {'.', '!', '?'}
async for chunk in raw_text_stream(prompt):
buffer += chunk
# Emit complete sentences
while any(c in buffer for c in sentence_endings):
for i, char in enumerate(buffer):
if char in sentence_endings:
# Check it is actually a sentence end (not "Dr." or "U.S.")
remaining = buffer[i+1:].lstrip()
if remaining and remaining[0].isupper():
sentence = buffer[:i+1]
buffer = buffer[i+1:].lstrip()
yield sentence
break
else:
break # No complete sentence found -- wait for more chunks
if buffer.strip():
yield buffer.strip()
# Compose the pipeline
async def main():
prompt = "Explain quantum entanglement in three sentences."
async for sentence in sentence_stream(prompt):
print(f"Sentence: {sentence!r}")
# This is where you would send to TTS, update UI, etc.
asyncio.run(main())
Part 5 -- FastAPI Streaming Endpoints
The most common production use case: a FastAPI endpoint that streams LLM output to a browser. FastAPI's StreamingResponse handles the HTTP plumbing:
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import anthropic
import asyncio
import json
app = FastAPI()
client = anthropic.AsyncAnthropic()
class ChatRequest(BaseModel):
message: str
history: list[dict] = []
async def generate_stream(
message: str,
history: list[dict],
) -> AsyncIterator[str]:
"""
Async generator that yields Server-Sent Events formatted strings.
SSE format: "data: {json}\n\n"
"""
messages = history + [{"role": "user", "content": message}]
try:
async with client.messages.stream(
model="claude-opus-4-5",
max_tokens=2048,
system="You are a helpful AI assistant.",
messages=messages,
) as stream:
async for text in stream.text_stream:
# Wrap each text chunk as an SSE event
event = json.dumps({"type": "text", "content": text})
yield f"data: {event}\n\n"
# Send a final event with token usage
final = stream.get_final_message()
done_event = json.dumps({
"type": "done",
"input_tokens": final.usage.input_tokens,
"output_tokens": final.usage.output_tokens,
})
yield f"data: {done_event}\n\n"
except anthropic.RateLimitError:
error_event = json.dumps({"type": "error", "message": "Rate limit exceeded"})
yield f"data: {error_event}\n\n"
except anthropic.APIConnectionError:
error_event = json.dumps({"type": "error", "message": "Connection failed"})
yield f"data: {error_event}\n\n"
@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
return StreamingResponse(
generate_stream(request.message, request.history),
media_type="text/event-stream",
headers={
# Disable buffering in nginx/proxies
"X-Accel-Buffering": "no",
# Keep the connection alive
"Cache-Control": "no-cache",
"Connection": "keep-alive",
},
)
The X-Accel-Buffering: no header is critical if you run behind nginx. By default, nginx buffers proxy responses. Without this header, the browser will not receive any chunks until nginx's buffer fills (typically 4KB or 8KB), defeating the purpose of streaming.
Handling Client Disconnects
When a user navigates away, the client closes the connection. Without disconnect detection, your server will keep generating tokens and billing you for output nobody reads:
import asyncio
from fastapi import Request
async def generate_stream_with_disconnect(
request: Request,
message: str,
history: list[dict],
) -> AsyncIterator[str]:
"""Streams LLM output and stops generation if the client disconnects."""
async with client.messages.stream(
model="claude-opus-4-5",
max_tokens=2048,
messages=history + [{"role": "user", "content": message}],
) as stream:
async for text in stream.text_stream:
# Check if client is still connected
if await request.is_disconnected():
# Stop the generator -- this will trigger the stream's
# context manager __aexit__ which closes the connection
return
event = json.dumps({"type": "text", "content": text})
yield f"data: {event}\n\n"
@app.post("/chat/stream")
async def chat_stream(request: Request, body: ChatRequest):
return StreamingResponse(
generate_stream_with_disconnect(request, body.message, body.history),
media_type="text/event-stream",
headers={"X-Accel-Buffering": "no", "Cache-Control": "no-cache"},
)
Part 6 -- Streaming with Tool Use
Tool use in streaming is significantly more complex than text streaming. Tool call arguments arrive as fragments and must be reassembled before dispatching the tool. Here is what the event stream looks like for a tool call:
message_start -- input_tokens count
content_block_start -- block index 0, type "tool_use", name "get_weather"
content_block_delta -- partial_json: '{"loca'
content_block_delta -- partial_json: 'tion": "San F'
content_block_delta -- partial_json: 'rancisco"}'
content_block_stop -- block 0 complete
message_delta -- stop_reason "tool_use", output_tokens count
message_stop
The tool call arguments arrive as fragments of a JSON string. You cannot parse them as JSON until the block is complete. Here is a full implementation:
import json
import anthropic
from dataclasses import dataclass, field
@dataclass
class ToolCallAccumulator:
"""Assembles a tool call from streaming fragments."""
tool_name: str = ""
tool_use_id: str = ""
partial_json: str = "" # Accumulates JSON argument fragments
def add_fragment(self, fragment: str) -> None:
self.partial_json += fragment
def parse_args(self) -> dict:
"""Parse the accumulated JSON. Raises ValueError if incomplete."""
return json.loads(self.partial_json)
def get_weather(location: str, unit: str = "celsius") -> dict:
"""Simulated tool implementation."""
return {"temperature": 22, "condition": "sunny", "location": location}
TOOLS = [
{
"name": "get_weather",
"description": "Get current weather for a location.",
"input_schema": {
"type": "object",
"properties": {
"location": {"type": "string", "description": "City name"},
"unit": {"type": "string", "enum": ["celsius", "fahrenheit"]},
},
"required": ["location"],
},
}
]
TOOL_REGISTRY = {"get_weather": get_weather}
async def agentic_stream(user_message: str) -> AsyncIterator[str]:
"""
Full agentic loop with streaming and tool use.
Yields text fragments for display and handles tool calls internally.
"""
client = anthropic.AsyncAnthropic()
messages = [{"role": "user", "content": user_message}]
while True:
# Track tool calls being assembled
current_tool: ToolCallAccumulator | None = None
# Track all tool calls in this response (there may be multiple)
completed_tool_calls: list[dict] = []
# Accumulate all content blocks for the assistant message
assistant_content_blocks: list[dict] = []
current_text = ""
async with client.messages.stream(
model="claude-opus-4-5",
max_tokens=1024,
tools=TOOLS,
messages=messages,
) as stream:
async for event in stream:
if event.type == "content_block_start":
if event.content_block.type == "tool_use":
# Start of a new tool call
current_tool = ToolCallAccumulator(
tool_name=event.content_block.name,
tool_use_id=event.content_block.id,
)
if current_text:
# Save accumulated text block
assistant_content_blocks.append({
"type": "text", "text": current_text
})
current_text = ""
elif event.type == "content_block_delta":
if event.delta.type == "text_delta":
# Regular text fragment -- yield to caller
yield event.delta.text
current_text += event.delta.text
elif event.delta.type == "input_json_delta":
# Tool argument fragment -- accumulate, do NOT yield
if current_tool:
current_tool.add_fragment(event.delta.partial_json)
elif event.type == "content_block_stop":
if current_tool is not None:
# Tool call is complete -- parse and save
try:
args = current_tool.parse_args()
except json.JSONDecodeError as e:
yield f"\n[Error parsing tool args: {e}]\n"
return
assistant_content_blocks.append({
"type": "tool_use",
"id": current_tool.tool_use_id,
"name": current_tool.tool_name,
"input": args,
})
completed_tool_calls.append({
"id": current_tool.tool_use_id,
"name": current_tool.tool_name,
"args": args,
})
current_tool = None
# After the stream, check if we need to execute tools
final = stream.get_final_message()
if current_text:
assistant_content_blocks.append({"type": "text", "text": current_text})
# Save the assistant's turn (may contain text + tool use blocks)
messages.append({"role": "assistant", "content": assistant_content_blocks})
if final.stop_reason != "tool_use":
# No tool calls -- conversation is done
break
# Execute all tool calls and add results to messages
tool_results = []
for call in completed_tool_calls:
tool_fn = TOOL_REGISTRY.get(call["name"])
if tool_fn is None:
result = {"error": f"Unknown tool: {call['name']}"}
else:
try:
result = tool_fn(**call["args"])
except Exception as e:
result = {"error": str(e)}
tool_results.append({
"type": "tool_result",
"tool_use_id": call["id"],
"content": json.dumps(result),
})
messages.append({"role": "user", "content": tool_results})
# Loop continues -- the model will now respond with the tool results
Part 7 -- Frontend Patterns
JavaScript EventSource (Native SSE)
The browser's EventSource API natively handles SSE from your FastAPI endpoint:
// Native browser EventSource API
const source = new EventSource('/chat/stream', {
// EventSource only supports GET. For POST, use fetch with ReadableStream.
});
source.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.type === 'text') {
// Append to the UI
document.getElementById('response').textContent += data.content;
} else if (data.type === 'done') {
source.close();
console.log(`Tokens used: ${data.input_tokens} in, ${data.output_tokens} out`);
} else if (data.type === 'error') {
source.close();
console.error('Error:', data.message);
}
};
source.onerror = () => {
source.close();
console.error('Stream connection lost');
};
Fetch API with ReadableStream (for POST endpoints)
For POST requests (which EventSource does not support), use the Fetch API with manual stream reading:
async function streamChat(message, history) {
const response = await fetch('/chat/stream', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({message, history}),
signal: AbortSignal.timeout(120_000), // 2-minute timeout
});
if (!response.ok) {
throw new Error(`HTTP ${response.status}`);
}
// response.body is a ReadableStream of Uint8Array chunks
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const {done, value} = await reader.read();
if (done) break;
// Decode the chunk and add to buffer
buffer += decoder.decode(value, {stream: true});
// Parse complete SSE events (delimited by \n\n)
const events = buffer.split('\n\n');
buffer = events.pop(); // Keep incomplete event in buffer
for (const event of events) {
if (!event.startsWith('data: ')) continue;
const jsonStr = event.slice(6); // Remove "data: " prefix
const data = JSON.parse(jsonStr);
if (data.type === 'text') {
appendToUI(data.content);
} else if (data.type === 'done') {
updateTokenCount(data.input_tokens, data.output_tokens);
}
}
}
}
React Hook for Streaming
import {useState, useCallback, useRef} from 'react';
function useStreamingChat() {
const [content, setContent] = useState('');
const [isStreaming, setIsStreaming] = useState(false);
const [error, setError] = useState(null);
const abortRef = useRef(null);
const sendMessage = useCallback(async (message, history = []) => {
// Abort any in-progress stream
if (abortRef.current) {
abortRef.current.abort();
}
const controller = new AbortController();
abortRef.current = controller;
setContent('');
setIsStreaming(true);
setError(null);
try {
const response = await fetch('/chat/stream', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({message, history}),
signal: controller.signal,
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const {done, value} = await reader.read();
if (done) break;
buffer += decoder.decode(value, {stream: true});
const events = buffer.split('\n\n');
buffer = events.pop();
for (const event of events) {
if (!event.startsWith('data: ')) continue;
const data = JSON.parse(event.slice(6));
if (data.type === 'text') {
// Functional update to avoid stale closure
setContent(prev => prev + data.content);
}
}
}
} catch (err) {
if (err.name !== 'AbortError') {
setError(err.message);
}
} finally {
setIsStreaming(false);
abortRef.current = null;
}
}, []);
const stop = useCallback(() => {
if (abortRef.current) {
abortRef.current.abort();
}
}, []);
return {content, isStreaming, error, sendMessage, stop};
}
Part 8 -- Error Handling in Streams
Errors in streams are more complex than errors in one-shot API calls. The connection can drop mid-stream, after partial output has already been sent to the client.
Connection Drop Recovery
import asyncio
import anthropic
from typing import AsyncIterator
async def resilient_stream(
prompt: str,
max_retries: int = 3,
) -> AsyncIterator[str]:
"""
Streams with automatic recovery from connection drops.
On reconnect, resumes from where the stream was cut off
by asking the model to continue from the partial output.
"""
client = anthropic.AsyncAnthropic()
accumulated = ""
messages = [{"role": "user", "content": prompt}]
attempts = 0
while attempts < max_retries:
try:
async with client.messages.stream(
model="claude-opus-4-5",
max_tokens=2048,
messages=messages,
) as stream:
async for text in stream.text_stream:
accumulated += text
yield text
# Stream completed successfully
return
except anthropic.APIConnectionError as e:
attempts += 1
if attempts >= max_retries:
yield f"\n[Stream interrupted after {len(accumulated)} chars: {e}]"
return
# Wait before retrying
wait = 2 ** attempts
yield f"\n[Connection lost. Retrying in {wait}s...]\n"
await asyncio.sleep(wait)
# Resume: ask the model to continue from where it stopped
if accumulated:
messages = [
{"role": "user", "content": prompt},
{"role": "assistant", "content": accumulated},
{
"role": "user",
"content": "Continue from where you left off.",
},
]
except anthropic.RateLimitError:
yield "\n[Rate limit hit. Please try again in a moment.]\n"
return
Partial JSON Accumulation
When streaming JSON output (e.g., structured extraction), the model may produce partial JSON that cannot be parsed until the stream completes. Here is how to validate progressively:
import json
from typing import AsyncIterator
async def stream_json_output(prompt: str) -> AsyncIterator[str]:
"""
Stream JSON output from an LLM.
Yields raw text chunks as they arrive.
Validates and parses JSON only at the end.
"""
client = anthropic.AsyncAnthropic()
accumulated = ""
async with client.messages.stream(
model="claude-opus-4-5",
max_tokens=1024,
system="You output only valid JSON. No preamble, no explanation.",
messages=[{"role": "user", "content": prompt}],
) as stream:
async for text in stream.text_stream:
accumulated += text
yield text # Forward chunks for display/progress indication
# After stream ends, validate the complete JSON
# Strip markdown code fences if model included them
json_text = accumulated.strip()
if json_text.startswith("```"):
json_text = json_text.split("\n", 1)[1].rsplit("```", 1)[0].strip()
try:
parsed = json.loads(json_text)
# At this point you have a valid Python dict/list
yield f"\n<!-- PARSED: {json.dumps(parsed)} -->" # For debugging
except json.JSONDecodeError as e:
raise ValueError(
f"Model produced invalid JSON: {e}\nRaw output: {accumulated[:500]}"
)
Part 9 -- Testing Streaming Endpoints
Testing streaming endpoints requires consuming the full stream and asserting on the assembled output.
Unit Testing Async Generators
import pytest
import anthropic
from unittest.mock import AsyncMock, MagicMock, patch
class MockStream:
"""Mock Anthropic stream for unit tests."""
def __init__(self, text_chunks: list[str]):
self._chunks = text_chunks
async def __aenter__(self):
return self
async def __aexit__(self, *args):
pass
@property
def text_stream(self):
return self._async_text_stream()
async def _async_text_stream(self):
for chunk in self._chunks:
yield chunk
def get_final_message(self):
mock_msg = MagicMock()
mock_msg.usage.input_tokens = 10
mock_msg.usage.output_tokens = len(self._chunks) * 2
mock_msg.stop_reason = "end_turn"
return mock_msg
@pytest.mark.asyncio
async def test_raw_text_stream():
"""Test that raw_text_stream yields chunks from the SDK."""
expected_chunks = ["Hello", " world", "!"]
with patch(
"anthropic.AsyncAnthropic.messages.stream",
return_value=MockStream(expected_chunks),
):
chunks = []
async for chunk in raw_text_stream("Say hello"):
chunks.append(chunk)
assert chunks == expected_chunks
assert "".join(chunks) == "Hello world!"
@pytest.mark.asyncio
async def test_streaming_endpoint():
"""Test the FastAPI streaming endpoint using HTTPX async client."""
from httpx import AsyncClient
from app import app # Your FastAPI app
with patch(
"app.client.messages.stream",
return_value=MockStream(["The", " answer", " is", " 42"]),
):
async with AsyncClient(app=app, base_url="http://test") as ac:
async with ac.stream(
"POST",
"/chat/stream",
json={"message": "What is 6 times 7?"},
) as response:
assert response.status_code == 200
# Consume the stream
full_content = ""
async for chunk in response.aiter_text():
# Parse SSE events
for line in chunk.split("\n\n"):
if line.startswith("data: "):
import json
data = json.loads(line[6:])
if data.get("type") == "text":
full_content += data["content"]
assert full_content == "The answer is 42"
Integration Testing with Real API (Use Sparingly)
@pytest.mark.integration
@pytest.mark.asyncio
async def test_streaming_real_api():
"""
Integration test against the real Anthropic API.
Mark with pytest.mark.integration and run only in CI
with ANTHROPIC_API_KEY set. Never run in every test run --
this costs money.
"""
import os
if not os.environ.get("ANTHROPIC_API_KEY"):
pytest.skip("ANTHROPIC_API_KEY not set")
chunks = []
async for chunk in raw_text_stream("Say 'test' and nothing else."):
chunks.append(chunk)
full_text = "".join(chunks)
assert len(chunks) > 0 # We got at least one chunk
assert "test" in full_text.lower() # The model said what we asked
assert len(full_text) < 100 # It did not ramble
Production Streaming Architecture
Key engineering decisions in this architecture:
- The FastAPI server acts as a streaming proxy -- it does not buffer the full LLM response before sending to the client.
- Side effects (logging, cost tracking, conversation storage) run concurrently via background tasks, not in the hot path.
- The
X-Accel-Buffering: noheader propagates through nginx so chunks arrive at the browser immediately. - Token counts are accumulated during streaming and written to the cost ledger in a single write after the stream completes (not per-chunk).
Key Takeaways
- Streaming uses Server-Sent Events (SSE) over HTTP. The SDK handles event parsing; you handle event semantics.
- LLM stream events have multiple types. Only
content_block_deltawithtext_deltacarries text. Most events carry metadata. - Async generators are the natural Python primitive for streaming pipelines. Use
async forto compose stages. - FastAPI's
StreamingResponsewithmedia_type="text/event-stream"is the standard way to stream LLM output to browsers. - Set
X-Accel-Buffering: nowhen behind nginx. Without it, chunks are buffered and users see no streaming behavior. - Always check for client disconnects and stop generation early. Running to completion for a disconnected client wastes tokens and money.
- Tool call arguments in streams arrive as JSON fragments. Accumulate them in a buffer; parse the complete JSON only when the content block stops.
- Test streaming endpoints by consuming the entire stream and asserting on the assembled output. Mock the SDK's stream context manager for unit tests.
Practice Problems
Problem 1: Implement a tee_stream async generator that takes a single LLM stream and yields chunks to two consumers simultaneously without making two API calls. (Hint: you will need a queue and tasks.)
Problem 2: Build a streaming endpoint that supports cancellation tokens. The client sends a cancel request to a separate endpoint, and the server stops the in-progress stream within 500ms. Use asyncio.Event for coordination.
Problem 3: The sentence_stream function in Part 4 has a bug: it can get stuck in an infinite loop if the model produces text without sentence-ending punctuation. Fix it, and add a test that covers this case.
Problem 4: Implement a streaming endpoint that also writes each chunk to a Redis pub/sub channel so that multiple browser tabs can subscribe to the same generation. Use aioredis for the pub/sub implementation.
Problem 5: Write a stream_with_timeout wrapper that raises asyncio.TimeoutError if more than N seconds pass between consecutive chunks (not total generation time). This detects hung streams where the model stops mid-generation but does not close the connection.
